其他
Delta Lake 中使用 Spark SQL DDL 和 DML
今天的内容可以理解为 Delta Lake 快速入门-Spark SQL 版
我们可以在 Python、Scala 或者 Java 中通过 spark.sql 来执行 SQL,也可以运行 Spark Thrift server(参考Spark Thrift Server 快速入门),直接在 SQL 客户端(如DBeaver)或者 BI(如 Superset、Metabase)里执行 SQL。
测试数据
Lending Club[1]
下载链接:https://pages.databricks.com/rs/094-YMS-629/images/SAISEU19-loan-risks.snappy.parquet
运行 Spark Thrift Server
今天介绍另外一种方法启动 Spark Thrift Server 以及使用 ipython-sql 来运行 SQL
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("SparkSQL") \
.config("hive.server2.thrift.port", "10000")\
.config("spark.sql.hive.thriftServer.singleSession", True)\
.enableHiveSupport()\
.config("spark.jars.packages", "io.delta:delta-core_2.12:0.8.0") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
from py4j.java_gateway import java_import
sc = spark.sparkContext
java_import(sc._gateway.jvm, "")
#Start Spark Thrift Server using the jvm and passing the SparkSession
sc._gateway.jvm.org.apache.spark.sql.hive.thriftserver \
.HiveThriftServer2.startWithContext(spark._jwrapped)
上面代码初始化了 SparkSession,并启动了 Spark Thrift Server, 端口 10000。
Delta table 与 Spark SQL
查看数据库
show databases; -- 返回default
show tables; -- 返回空
创建 Delta table
CREATE TABLE loan_risk_delta (
loan_id bigint,
funded_amnt int,
paid_amnt double,
addr_state string
)
USING DELTA
LOCATION '/tmp/loan_risk_delta'
导入数据
从 Parquet 或者 Spark DataFrame 导入
INSERT INTO loan_risk_delta
SELECT * FROM parquet.`SAISEU19-loan-risks.snappy.parquet`;
SELECT COUNT(*) FROM loan_risk_delta;
-- 返回14705
上面 2 条语句也可以合并一次完成,
CREATE TABLE loan_risk_delta1
USING DELTA
LOCATION '/tmp/loan_risk_delta1'
AS SELECT * FROM parquet.`SAISEU19-loan-risks.snappy.parquet`
修改表
CREATE OR REPLACE TABLE loan_risk_delta (
loan_id bigint,
funded_amnt int,
paid_amnt double,
addr_state string
)
USING DELTA
PARTITIONED BY (addr_state)
LOCATION '/tmp/loan_risk_delta'
追加数据
INSERT INTO loan_risk_delta
SELECT * FROM loan_risk_delta1;
SELECT COUNT(*) FROM loan_risk_delta;
-- 返回29410
覆盖
INSERT OVERWRITE loan_risk_delta
SELECT * FROM loan_risk_delta1;
SELECT COUNT(*) FROM loan_risk_delta
-- 返回14705
删除与更新
DELETE FROM loan_risk_delta
WHERE addr_state='WY';
UPDATE loan_risk_delta
SET addr_state = 'West Virginia'
WHERE addr_state = 'WV';
SELECT addr_state, COUNT(*) as cnt
FROM loan_risk_delta
WHERE addr_state in ('WY','West Virginia','WV')
GROUP BY 1;
--返回
addr_state cnt
-------------------
West Virginia 60
Upsert
-- Upsert data to a target Delta
-- table using merge
MERGE INTO loan_risk_delta
USING loan_risk_delta1
ON loan_risk_delta.loan_id = loan_risk_delta1.loan_id
WHEN MATCHED THEN UPDATE
SET loan_risk_delta.addr_state = loan_risk_delta1.addr_state
WHEN NOT MATCHED THEN INSERT
(loan_id, funded_amnt, paid_amnt,addr_state)
VALUES (loan_id, funded_amnt, paid_amnt,addr_state);
SELECT addr_state, COUNT(*) as cnt
FROM loan_risk_delta
WHERE addr_state in ('WY','West Virginia','WV')
GROUP BY 1;
-- 返回
addr_state cnt
WY 31
WV 60
体验批流一体
模拟流数据的函数,
import random
from pyspark.sql.functions import *
from pyspark.sql.types import *
def random_checkpoint_dir(table_path):
return f"{table_path}/chkpt/{random.randint(0, 10000)}"
#UDF:生成随机州
states = ["CA", "TX", "NY", "IA"]
@udf(returnType=StringType())
def random_state():
return str(random.choice(states))
# 随机生成的数据流,并将其追加到delta table
def generate_and_append_data_stream(table_format, table_path):
stream_data = spark.readStream.format("rate").option("rowsPerSecond", 50).load() \
.withColumn("loan_id", 10000 + col("value")) \
.withColumn("funded_amnt", (rand() * 5000 + 5000).cast("integer")) \
.withColumn("paid_amnt", col("funded_amnt") - (rand() * 2000)) \
.withColumn("addr_state", random_state()) \
.select("loan_id", "funded_amnt", "paid_amnt", "addr_state")
# *********** FIXED THE SCHEMA OF THE GENERATED DATA *************
query = stream_data.writeStream \
.format(table_format) \
.option("checkpointLocation", random_checkpoint_dir(table_path)) \
.trigger(processingTime="10 seconds") \
.start(table_path)
return query
执行两个到 delta table 的流写入,
delta_path = "/tmp/loan_risk_delta"
stream_1 = generate_and_append_data_stream(table_format = "delta", table_path = delta_path)
stream_2 = generate_and_append_data_stream(table_format = "delta", table_path = delta_path)
测试
接下来每次执行,
SELECT COUNT(*) FROM loan_risk_delta
都会发现数据量在增加。
关闭流
import shutil
def stop_all_streams():
# Stop all the streams
print("Stopping all streams")
for s in spark.streams.active:
s.stop()
print("Stopped all streams")
print("Deleting checkpoints")
shutil.rmtree("/tmp/loan_risk_delta/chkpt/", True)
print("Deleted checkpoints")
stop_all_streams()
关闭 Spark 和 Spark Thrift Server
spark.stop()
参考资料
Lending Club: https://www.kaggle.com/wendykan/lending-club-loan-data
欢迎关注公众号
有兴趣加群讨论数据挖掘和分析的朋友可以加我微信(witwall),暗号:入群